import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.join.JoinFunctionAssigner;
import org.apache.flink.api.java.operators.join.JoinOperatorSetsBase;


public class JoinAlgorithmHints2
{
    public static void main(String[] args) throws Exception
    {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        String rootPath = System.getProperty("user.dir");

        DataSet<Tuple2<Integer, String>> input1 = env.readCsvFile("file://" + rootPath + "/" + "group3.csv").types(Integer.class, String.class);
        DataSet<Tuple2<Integer, String>> input2 = env.readCsvFile("file://" + rootPath + "/" + "group4.csv").types(Integer.class, String.class);

        input1.print();
        System.out.println("-------------------------------------------");
        input2.print();


 DataSet< Tuple2<Integer, String>> result1 =input1.join(input2, JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST).where(0).equalTo(0).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>() {
     @Override
     public Tuple2<Integer, String> join(Tuple2<Integer, String> integerStringTuple2, Tuple2<Integer, String> integerStringTuple22) throws Exception {
         return integerStringTuple2;
     }
 });
// hint that the second DataSet is very small


        DataSet<Tuple2<Integer, String>> result2 = input1.rightOuterJoin(input2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE) .where(1).equalTo(1).with(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>() {
            @Override
            public Tuple2<Integer, String> join(Tuple2<Integer, String> integerStringTuple2, Tuple2<Integer, String> integerStringTuple22) throws Exception {
                return integerStringTuple22;
            }
        });
//两个dataset根据第0列数据进行join操作
        System.out.println("-------------------------------print result1-------------------------------------------");
        result1.print();
        System.out.println("-------------------------------print result2-------------------------------------------");
        result2.print();
    }

}



//这里.where(0).equalTo(0)的意思是:
//数据集A的第0列等于数据集B的第0列，将符合该规律的数据筛选出来。

//上述代码调试的时候要注意一个基本的常识:
//leftOuterJoin,那么Null肯定只能出现在右侧
//rightOuterJoin,那么Null肯定只能出现在左侧
//上述实例是受到下面的例子启发的:
//https://www.cnblogs.com/williamjie/p/9498032.html